package com.jhf.youke.flink.domain.service;

import com.jhf.youke.flink.domain.model.dto.SensorReading;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
* @Description:
* @Param:
* @return:
* @Author: RHJ
* @Date: 2022/11/21
*/
public class TransformService {

    private StreamExecutionEnvironment env;

    public TransformService(){
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
    }

    public void wordCount() throws Exception{
        // 使用windows 子系统中 nc -lk 777 发送数据模拟
        DataStream<String> dataStream = env.socketTextStream("localhost",777);

        dataStream.print();

        DataStream<Integer> mapStream = dataStream.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String s) throws Exception {
                return s.length();
            }
        });

        mapStream.print();

        DataStream<String> flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] split = s.split(" ");
                for (String val : split) {
                    collector.collect(val);
                }
            }
        });


        flatMapStream.print("flatMapStream");

        // 3. filter,筛选"sensor_1"开头的数据
        DataStream<String> filterStream = dataStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value.startsWith("hello");
            }
        });

        filterStream.print("filterStream");

        DataStream<String> keyedStream = null;

        env.execute();


    }

    public void wordCountBy() throws Exception{
        // 使用windows 子系统中 nc -lk 777 发送数据模拟
        DataStream<String> dataStream = env.socketTextStream("localhost",777);

        DataStream<SensorReading> sensorStream = dataStream.map( line ->{
            String[] fields  = line.split(" ");
            return new SensorReading(Long.valueOf(fields[0]), fields[1], Double.valueOf(fields[2]));
        });

        KeyedStream<SensorReading, Long> keyedStream = sensorStream.keyBy(SensorReading::getId);

        DataStream<SensorReading> outStream = keyedStream.maxBy("num");

        outStream.print();

        env.execute();

    }

    public static void main(String[] args) throws Exception {
        TransformService transformService = new TransformService();
        transformService.wordCountBy();
    }



}
